// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "io/fs/packed_file_manager.h"

#include <bvar/bvar.h>
#include <bvar/recorder.h>
#include <bvar/window.h>

#include <algorithm>
#include <chrono>
#include <cstdint>
#include <ctime>
#include <functional>
#include <limits>
#include <optional>
#include <random>
#include <sstream>
#include <unordered_set>

#ifdef BE_TEST
#include "cpp/sync_point.h"
#endif

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "gen_cpp/cloud.pb.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "util/coding.h"
#include "util/uid_util.h"

namespace doris::io {

namespace {

bvar::Adder<int64_t> g_packed_file_total_count("packed_file", "total_count");
bvar::Adder<int64_t> g_packed_file_total_small_file_count("packed_file", "total_small_file_num");
bvar::Adder<int64_t> g_packed_file_total_size_bytes("packed_file", "total_size_bytes");
bvar::IntRecorder g_packed_file_small_file_num_recorder;
bvar::IntRecorder g_packed_file_file_size_recorder;
bvar::Window<bvar::IntRecorder> g_packed_file_avg_small_file_num(
        "packed_file_avg_small_file_num", &g_packed_file_small_file_num_recorder,
        /*window_size=*/10);
bvar::Window<bvar::IntRecorder> g_packed_file_avg_file_size_bytes("packed_file_avg_file_size_bytes",
                                                                  &g_packed_file_file_size_recorder,
                                                                  /*window_size=*/10);
bvar::IntRecorder g_packed_file_active_to_ready_ms_recorder;
bvar::IntRecorder g_packed_file_ready_to_upload_ms_recorder;
bvar::IntRecorder g_packed_file_uploading_to_uploaded_ms_recorder;
bvar::Window<bvar::IntRecorder> g_packed_file_active_to_ready_ms_window(
        "packed_file_active_to_ready_ms", &g_packed_file_active_to_ready_ms_recorder,
        /*window_size=*/10);
bvar::Window<bvar::IntRecorder> g_packed_file_ready_to_upload_ms_window(
        "packed_file_ready_to_upload_ms", &g_packed_file_ready_to_upload_ms_recorder,
        /*window_size=*/10);
bvar::Window<bvar::IntRecorder> g_packed_file_uploading_to_uploaded_ms_window(
        "packed_file_uploading_to_uploaded_ms", &g_packed_file_uploading_to_uploaded_ms_recorder,
        /*window_size=*/10);

Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_file_path,
                                  const cloud::PackedFileInfoPB& packed_file_info) {
    if (writer == nullptr) {
        return Status::InternalError("File writer is null for packed file: {}", packed_file_path);
    }
    if (writer->state() == FileWriter::State::CLOSED) {
        return Status::InternalError("File writer already closed for packed file: {}",
                                     packed_file_path);
    }

    std::string serialized_info;
    if (!packed_file_info.SerializeToString(&serialized_info)) {
        return Status::InternalError("Failed to serialize packed file info for {}",
                                     packed_file_path);
    }

    if (serialized_info.size() > std::numeric_limits<uint32_t>::max()) {
        return Status::InternalError("PackedFileInfoPB too large for {}", packed_file_path);
    }

    std::string trailer;
    trailer.reserve(serialized_info.size() + sizeof(uint32_t));
    trailer.append(serialized_info);
    put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_info.size()));

    return writer->append(Slice(trailer));
}

} // namespace

PackedFileManager* PackedFileManager::instance() {
    static PackedFileManager instance;
    return &instance;
}

PackedFileManager::~PackedFileManager() {
    stop_background_manager();
}

Status PackedFileManager::init() {
    return Status::OK();
}

Status PackedFileManager::create_new_packed_file_context(
        const std::string& resource_id, std::unique_ptr<PackedFileContext>& packed_file_ctx) {
    FileSystemSPtr file_system;
    RETURN_IF_ERROR(ensure_file_system(resource_id, &file_system));
    if (file_system == nullptr) {
        return Status::InternalError("File system is not available for packed file creation: " +
                                     resource_id);
    }

    auto uuid = generate_uuid_string();
    auto hash_val = std::hash<std::string> {}(uuid);
    uint16_t path_bucket = hash_val % 4096 + 1;
    std::stringstream path_stream;
    path_stream << "data/packed_file/" << path_bucket << "/" << uuid << ".bin";

    packed_file_ctx = std::make_unique<PackedFileContext>();
    const std::string relative_path = path_stream.str();
    packed_file_ctx->packed_file_path = relative_path;
    packed_file_ctx->create_time = std::time(nullptr);
    packed_file_ctx->create_timestamp = std::chrono::steady_clock::now();
    packed_file_ctx->state = PackedFileState::INIT;
    packed_file_ctx->resource_id = resource_id;
    packed_file_ctx->file_system = std::move(file_system);

    // Create file writer for the packed file
    FileWriterPtr new_writer;
    FileWriterOptions opts;
    RETURN_IF_ERROR(
            packed_file_ctx->file_system->create_file(Path(relative_path), &new_writer, &opts));
    packed_file_ctx->writer = std::move(new_writer);

    return Status::OK();
}

Status PackedFileManager::ensure_file_system(const std::string& resource_id,
                                             FileSystemSPtr* file_system) {
    if (file_system == nullptr) {
        return Status::InvalidArgument("file_system output parameter is null");
    }

    {
        std::lock_guard<std::mutex> lock(_file_system_mutex);
        if (resource_id.empty()) {
            if (_default_file_system != nullptr) {
                *file_system = _default_file_system;
                return Status::OK();
            }
        } else {
            auto it = _file_systems.find(resource_id);
            if (it != _file_systems.end()) {
                *file_system = it->second;
                return Status::OK();
            }
        }
    }

    if (!config::is_cloud_mode()) {
        return Status::InternalError("Cloud file system is not available in local mode");
    }

    auto* exec_env = ExecEnv::GetInstance();
    if (exec_env == nullptr) {
        return Status::InternalError("ExecEnv instance is not initialized");
    }

    FileSystemSPtr resolved_fs;
    if (resource_id.empty()) {
        resolved_fs = exec_env->storage_engine().to_cloud().latest_fs();
        if (resolved_fs == nullptr) {
            return Status::InternalError("Cloud file system is not ready");
        }
    } else {
        auto storage_resource =
                exec_env->storage_engine().to_cloud().get_storage_resource(resource_id);
        if (!storage_resource.has_value() || storage_resource->fs == nullptr) {
            return Status::InternalError("Cloud file system is not ready for resource: " +
                                         resource_id);
        }
        resolved_fs = storage_resource->fs;
    }

    {
        std::lock_guard<std::mutex> lock(_file_system_mutex);
        if (resource_id.empty()) {
            _default_file_system = resolved_fs;
            *file_system = _default_file_system;
        } else {
            _file_systems[resource_id] = resolved_fs;
            *file_system = resolved_fs;
        }
    }

    return Status::OK();
}

Status PackedFileManager::append_small_file(const std::string& path, const Slice& data,
                                            const PackedAppendContext& info) {
    // Check if file is too large to be merged
    if (data.get_size() > config::small_file_threshold_bytes) {
        return Status::OK(); // Skip merging for large files
    }

    if (info.txn_id <= 0) {
        return Status::InvalidArgument("Missing valid txn id for packed file append: " + path);
    }

    std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);

    auto& current_state = _current_packed_files[info.resource_id];
    if (!current_state || !current_state->writer) {
        RETURN_IF_ERROR(create_new_packed_file_context(info.resource_id, current_state));
    }

    // Check if we need to create a new packed file
    if (current_state->total_size + data.get_size() >= config::packed_file_size_threshold_bytes) {
        RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
        auto it = _current_packed_files.find(info.resource_id);
        if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
            RETURN_IF_ERROR(create_new_packed_file_context(
                    info.resource_id, _current_packed_files[info.resource_id]));
        }
    }

    PackedFileContext* active_state = current_state.get();

    // Write data to current packed file
    RETURN_IF_ERROR(active_state->writer->append(data));

    // Update index
    PackedSliceLocation location;
    location.packed_file_path = active_state->packed_file_path;
    location.offset = active_state->current_offset;
    location.size = data.get_size();
    location.tablet_id = info.tablet_id;
    location.rowset_id = info.rowset_id;
    location.resource_id = info.resource_id;
    location.txn_id = info.txn_id;

    active_state->slice_locations[path] = location;
    active_state->current_offset += data.get_size();
    active_state->total_size += data.get_size();

    // Rotate packed file when small file count reaches threshold
    if (config::packed_file_small_file_count_threshold > 0 &&
        static_cast<int64_t>(active_state->slice_locations.size()) >=
                config::packed_file_small_file_count_threshold) {
        RETURN_IF_ERROR(mark_current_packed_file_for_upload_locked(info.resource_id));
    }

    // Mark as active if this is the first write
    if (!active_state->first_append_timestamp.has_value()) {
        active_state->first_append_timestamp = std::chrono::steady_clock::now();
    }
    if (active_state->state == PackedFileState::INIT) {
        active_state->state = PackedFileState::ACTIVE;
    }

    // Update global index
    {
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
        _global_slice_locations[path] = location;
    }

    return Status::OK();
}

Status PackedFileManager::wait_for_packed_file_upload(PackedFileContext* packed_file_ptr) {
    std::unique_lock<std::mutex> upload_lock(packed_file_ptr->upload_mutex);
    packed_file_ptr->upload_cv.wait(upload_lock, [packed_file_ptr] {
        auto state = packed_file_ptr->state.load(std::memory_order_acquire);
        return state == PackedFileState::UPLOADED || state == PackedFileState::FAILED;
    });
    if (packed_file_ptr->state == PackedFileState::FAILED) {
        std::string err = packed_file_ptr->last_error;
        if (err.empty()) {
            err = "Packed file upload failed";
        }
        return Status::InternalError(err);
    }
    return Status::OK();
}

Status PackedFileManager::wait_upload_done(const std::string& path) {
    std::string packed_file_path;
    {
        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
        auto it = _global_slice_locations.find(path);
        if (it == _global_slice_locations.end()) {
            return Status::InternalError("File not found in global index: " + path);
        }
        packed_file_path = it->second.packed_file_path;
    }

    // Find the packed file in uploaded files first - if already uploaded, no need to wait
    std::shared_ptr<PackedFileContext> managed_packed_file;
    std::shared_ptr<PackedFileContext> failed_packed_file;
    {
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
        auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
        if (uploaded_it != _uploaded_packed_files.end()) {
            auto state = uploaded_it->second->state.load(std::memory_order_acquire);
            if (state == PackedFileState::UPLOADED) {
                return Status::OK(); // Already uploaded, no need to wait
            }
            if (state == PackedFileState::FAILED) {
                failed_packed_file = uploaded_it->second;
            } else {
                managed_packed_file = uploaded_it->second;
            }
        }
    }

    if (failed_packed_file) {
        std::lock_guard<std::mutex> upload_lock(failed_packed_file->upload_mutex);
        std::string err = failed_packed_file->last_error;
        if (err.empty()) {
            err = "Merge file upload failed";
        }
        return Status::InternalError(err);
    }

    // Find the packed file in either current or uploading files
    PackedFileContext* packed_file_ptr = nullptr;
    {
        std::lock_guard<std::timed_mutex> current_lock(_current_packed_file_mutex);
        for (auto& [resource_id, state] : _current_packed_files) {
            if (state && state->packed_file_path == packed_file_path) {
                packed_file_ptr = state.get();
                break;
            }
        }
    }

    if (!packed_file_ptr) {
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
        auto uploading_it = _uploading_packed_files.find(packed_file_path);
        if (uploading_it != _uploading_packed_files.end()) {
            managed_packed_file = uploading_it->second;
            packed_file_ptr = managed_packed_file.get();
        } else {
            auto uploaded_it = _uploaded_packed_files.find(packed_file_path);
            if (uploaded_it != _uploaded_packed_files.end()) {
                managed_packed_file = uploaded_it->second;
                packed_file_ptr = managed_packed_file.get();
            }
        }
    }

    if (!packed_file_ptr) {
        // Packed file not found in any location, this is unexpected
        return Status::InternalError("Packed file not found for path: " + path);
    }

    Status wait_status = wait_for_packed_file_upload(packed_file_ptr);
    (void)managed_packed_file; // keep shared ownership alive during wait
    return wait_status;
}

Status PackedFileManager::get_packed_slice_location(const std::string& path,
                                                    PackedSliceLocation* location) {
    std::lock_guard<std::mutex> lock(_global_index_mutex);
    auto it = _global_slice_locations.find(path);
    if (it == _global_slice_locations.end()) {
        return Status::NotFound("File not found in global packed index: {}", path);
    }

    *location = it->second;
    return Status::OK();
}

void PackedFileManager::start_background_manager() {
    if (_background_thread) {
        return; // Already started
    }

    _stop_background_thread = false;
    _background_thread = std::make_unique<std::thread>([this] { background_manager(); });
}

void PackedFileManager::stop_background_manager() {
    _stop_background_thread = true;
    if (_background_thread && _background_thread->joinable()) {
        _background_thread->join();
    }
    _background_thread.reset();
}

Status PackedFileManager::mark_current_packed_file_for_upload_locked(
        const std::string& resource_id) {
    auto it = _current_packed_files.find(resource_id);
    if (it == _current_packed_files.end() || !it->second || !it->second->writer) {
        return Status::OK(); // Nothing to mark for upload
    }

    auto& current = it->second;

    // Mark as ready for upload
    current->state = PackedFileState::READY_TO_UPLOAD;
    if (!current->ready_to_upload_timestamp.has_value()) {
        auto now = std::chrono::steady_clock::now();
        current->ready_to_upload_timestamp = now;
        int64_t active_to_ready_ms = -1;
        if (current->first_append_timestamp.has_value()) {
            active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                                         now - *current->first_append_timestamp)
                                         .count();
            g_packed_file_active_to_ready_ms_recorder << active_to_ready_ms;
            if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
                sampler->take_sample();
            }
        }
        LOG(INFO) << "Packed file " << current->packed_file_path
                  << " transition ACTIVE->READY_TO_UPLOAD; active_to_ready_ms="
                  << active_to_ready_ms;
    }

    // Move to uploading files list
    {
        std::shared_ptr<PackedFileContext> uploading_ptr =
                std::shared_ptr<PackedFileContext>(std::move(current));
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
        _uploading_packed_files[uploading_ptr->packed_file_path] = uploading_ptr;
    }

    // Create new packed file
    return create_new_packed_file_context(resource_id, it->second);
}

Status PackedFileManager::mark_current_packed_file_for_upload(const std::string& resource_id) {
    std::lock_guard<std::timed_mutex> lock(_current_packed_file_mutex);
    return mark_current_packed_file_for_upload_locked(resource_id);
}

void PackedFileManager::record_packed_file_metrics(const PackedFileContext& packed_file) {
    g_packed_file_total_count << 1;
    g_packed_file_total_small_file_count
            << static_cast<int64_t>(packed_file.slice_locations.size());
    g_packed_file_total_size_bytes << packed_file.total_size;
    g_packed_file_small_file_num_recorder
            << static_cast<int64_t>(packed_file.slice_locations.size());
    g_packed_file_file_size_recorder << packed_file.total_size;
    // Flush samplers immediately so the window bvar reflects the latest packed file.
    if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
        sampler->take_sample();
    }
    if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
        sampler->take_sample();
    }
}

void PackedFileManager::background_manager() {
    auto last_cleanup_time = std::chrono::steady_clock::now();

    while (!_stop_background_thread.load()) {
        int64_t check_interval_ms =
                std::max<int64_t>(1, config::packed_file_time_threshold_ms / 10);
        std::this_thread::sleep_for(std::chrono::milliseconds(check_interval_ms));

        // Check if current packed file should be closed due to time threshold
        std::vector<std::string> resources_to_mark;
        {
            std::unique_lock<std::timed_mutex> current_lock(_current_packed_file_mutex,
                                                            std::defer_lock);
            int64_t lock_wait_ms = std::max<int64_t>(0, config::packed_file_try_lock_timeout_ms);
            if (current_lock.try_lock_for(std::chrono::milliseconds(lock_wait_ms))) {
                for (auto& [resource_id, state] : _current_packed_files) {
                    if (!state || state->state != PackedFileState::ACTIVE) {
                        continue;
                    }
                    if (!state->first_append_timestamp.has_value()) {
                        continue;
                    }
                    auto current_time = std::chrono::steady_clock::now();
                    auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                                              current_time - *(state->first_append_timestamp))
                                              .count();
                    if (elapsed_ms >= config::packed_file_time_threshold_ms) {
                        resources_to_mark.push_back(resource_id);
                    }
                }
            }
        }
        for (const auto& resource_id : resources_to_mark) {
            Status st = mark_current_packed_file_for_upload(resource_id);
            if (!st.ok()) {
                LOG(WARNING) << "Failed to close current packed file for resource " << resource_id
                             << ": " << st.to_string();
            }
        }

        // Process uploading files
        process_uploading_packed_files();

        auto now = std::chrono::steady_clock::now();
        int64_t cleanup_interval_sec =
                std::max<int64_t>(1, config::packed_file_cleanup_interval_seconds);
        auto cleanup_interval = std::chrono::seconds(cleanup_interval_sec);
        if (now - last_cleanup_time >= cleanup_interval) {
            cleanup_expired_data();
            last_cleanup_time = now;
        }
    }
}

void PackedFileManager::process_uploading_packed_files() {
    std::vector<std::shared_ptr<PackedFileContext>> files_ready;
    std::vector<std::shared_ptr<PackedFileContext>> files_uploading;
    auto record_ready_to_upload = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
        if (!packed_file->uploading_timestamp.has_value()) {
            packed_file->uploading_timestamp = std::chrono::steady_clock::now();
            int64_t duration_ms = -1;
            if (packed_file->ready_to_upload_timestamp.has_value()) {
                duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                                      *packed_file->uploading_timestamp -
                                      *packed_file->ready_to_upload_timestamp)
                                      .count();
                g_packed_file_ready_to_upload_ms_recorder << duration_ms;
                if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
                    sampler->take_sample();
                }
            }
            LOG(INFO) << "Packed file " << packed_file->packed_file_path
                      << " transition READY_TO_UPLOAD->UPLOADING; "
                         "ready_to_upload_ms="
                      << duration_ms;
        }
    };

    {
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
        for (auto& [packed_file_path, packed_file] : _uploading_packed_files) {
            auto state = packed_file->state.load(std::memory_order_acquire);
            if (state != PackedFileState::READY_TO_UPLOAD && state != PackedFileState::UPLOADING) {
                continue;
            }
            if (state == PackedFileState::READY_TO_UPLOAD) {
                files_ready.emplace_back(packed_file);
            } else {
                files_uploading.emplace_back(packed_file);
            }
        }
    }

    auto handle_success = [&](const std::shared_ptr<PackedFileContext>& packed_file) {
        auto now = std::chrono::steady_clock::now();
        int64_t active_to_ready_ms = -1;
        int64_t ready_to_upload_ms = -1;
        int64_t uploading_to_uploaded_ms = -1;
        if (packed_file->first_append_timestamp.has_value() &&
            packed_file->ready_to_upload_timestamp.has_value()) {
            active_to_ready_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                                         *packed_file->ready_to_upload_timestamp -
                                         *packed_file->first_append_timestamp)
                                         .count();
        }
        if (packed_file->ready_to_upload_timestamp.has_value() &&
            packed_file->uploading_timestamp.has_value()) {
            ready_to_upload_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                                         *packed_file->uploading_timestamp -
                                         *packed_file->ready_to_upload_timestamp)
                                         .count();
        }
        if (packed_file->uploading_timestamp.has_value()) {
            uploading_to_uploaded_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                                               now - *packed_file->uploading_timestamp)
                                               .count();
            g_packed_file_uploading_to_uploaded_ms_recorder << uploading_to_uploaded_ms;
            if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
                sampler->take_sample();
            }
        }
        LOG(INFO) << "Packed file " << packed_file->packed_file_path
                  << " upload completed; active_to_ready_ms=" << active_to_ready_ms
                  << ", ready_to_upload_ms=" << ready_to_upload_ms
                  << ", uploading_to_uploaded_ms=" << uploading_to_uploaded_ms;
        {
            std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
            packed_file->state = PackedFileState::UPLOADED;
            packed_file->upload_time = std::time(nullptr);
        }
        packed_file->upload_cv.notify_all();
        {
            std::lock_guard<std::mutex> lock(_packed_files_mutex);
            _uploading_packed_files.erase(packed_file->packed_file_path);
            _uploaded_packed_files[packed_file->packed_file_path] = packed_file;
        }
    };

    auto handle_failure = [&](const std::shared_ptr<PackedFileContext>& packed_file,
                              const Status& status) {
        LOG(WARNING) << "Failed to upload packed file: " << packed_file->packed_file_path
                     << ", error: " << status.to_string();
        {
            std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
            packed_file->state = PackedFileState::FAILED;
            packed_file->last_error = status.to_string();
            packed_file->upload_time = std::time(nullptr);
        }
        packed_file->upload_cv.notify_all();
        {
            std::lock_guard<std::mutex> lock(_packed_files_mutex);
            _uploading_packed_files.erase(packed_file->packed_file_path);
            _uploaded_packed_files[packed_file->packed_file_path] = packed_file;
        }
    };

    for (auto& packed_file : files_ready) {
        const std::string& packed_file_path = packed_file->packed_file_path;
        cloud::PackedFileInfoPB packed_file_info;
        packed_file_info.set_ref_cnt(packed_file->slice_locations.size());
        packed_file_info.set_total_slice_num(packed_file->slice_locations.size());
        packed_file_info.set_total_slice_bytes(packed_file->total_size);
        packed_file_info.set_remaining_slice_bytes(packed_file->total_size);
        packed_file_info.set_created_at_sec(packed_file->create_time);
        packed_file_info.set_corrected(false);
        packed_file_info.set_state(doris::cloud::PackedFileInfoPB::NORMAL);
        packed_file_info.set_resource_id(packed_file->resource_id);

        for (const auto& [small_file_path, index] : packed_file->slice_locations) {
            auto* small_file = packed_file_info.add_slices();
            small_file->set_path(small_file_path);
            small_file->set_offset(index.offset);
            small_file->set_size(index.size);
            small_file->set_deleted(false);
            if (index.tablet_id != 0) {
                small_file->set_tablet_id(index.tablet_id);
            }
            if (!index.rowset_id.empty()) {
                small_file->set_rowset_id(index.rowset_id);
            }
            if (index.txn_id != 0) {
                small_file->set_txn_id(index.txn_id);
            }
        }

        Status meta_status = update_meta_service(packed_file->packed_file_path, packed_file_info);
        if (!meta_status.ok()) {
            LOG(WARNING) << "Failed to update meta service for packed file: "
                         << packed_file->packed_file_path << ", error: " << meta_status.to_string();
            handle_failure(packed_file, meta_status);
            continue;
        }

        // Record stats once the packed file metadata is persisted.
        record_packed_file_metrics(*packed_file);

        Status trailer_status = append_packed_info_trailer(
                packed_file->writer.get(), packed_file->packed_file_path, packed_file_info);
        if (!trailer_status.ok()) {
            handle_failure(packed_file, trailer_status);
            continue;
        }

        // Now upload the file
        if (!packed_file->slice_locations.empty()) {
            std::ostringstream oss;
            oss << "Uploading packed file " << packed_file_path << " with "
                << packed_file->slice_locations.size() << " small files: ";
            bool first = true;
            for (const auto& [small_file_path, index] : packed_file->slice_locations) {
                if (!first) {
                    oss << ", ";
                }
                first = false;
                oss << "[" << small_file_path << ", offset=" << index.offset
                    << ", size=" << index.size << "]";
            }
            LOG(INFO) << oss.str();
        } else {
            LOG(INFO) << "Uploading packed file " << packed_file_path << " with no small files";
        }

        Status upload_status = finalize_packed_file_upload(packed_file->packed_file_path,
                                                           packed_file->writer.get());

        if (upload_status.is<ErrorCode::ALREADY_CLOSED>()) {
            record_ready_to_upload(packed_file);
            handle_success(packed_file);
            continue;
        }
        if (!upload_status.ok()) {
            handle_failure(packed_file, upload_status);
            continue;
        }

        record_ready_to_upload(packed_file);
        packed_file->state = PackedFileState::UPLOADING;
    }

    for (auto& packed_file : files_uploading) {
        if (!packed_file->writer) {
            handle_failure(packed_file,
                           Status::InternalError("File writer is null for packed file: {}",
                                                 packed_file->packed_file_path));
            continue;
        }

        if (packed_file->writer->state() != FileWriter::State::CLOSED) {
            continue;
        }

        Status status = packed_file->writer->close(true);
        if (status.is<ErrorCode::ALREADY_CLOSED>()) {
            handle_success(packed_file);
            continue;
        }
        if (status.ok()) {
            continue;
        }

        handle_failure(packed_file, status);
    }
}

Status PackedFileManager::finalize_packed_file_upload(const std::string& packed_file_path,
                                                      FileWriter* writer) {
    if (writer == nullptr) {
        return Status::InternalError("File writer is null for packed file: " + packed_file_path);
    }

    return writer->close(true);
}

Status PackedFileManager::update_meta_service(const std::string& packed_file_path,
                                              const cloud::PackedFileInfoPB& packed_file_info) {
#ifdef BE_TEST
    TEST_SYNC_POINT_RETURN_WITH_VALUE("PackedFileManager::update_meta_service", Status::OK(),
                                      packed_file_path, &packed_file_info);
#endif
    VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with "
               << packed_file_info.total_slice_num() << " small files"
               << ", total bytes: " << packed_file_info.total_slice_bytes();

    // Get CloudMetaMgr through StorageEngine
    if (!config::is_cloud_mode()) {
        return Status::InternalError("Storage engine is not cloud mode");
    }

    auto& storage_engine = ExecEnv::GetInstance()->storage_engine();
    auto& cloud_meta_mgr = storage_engine.to_cloud().meta_mgr();
    return cloud_meta_mgr.update_packed_file_info(packed_file_path, packed_file_info);
}

void PackedFileManager::cleanup_expired_data() {
    auto current_time = std::time(nullptr);

    // Clean up expired uploaded files
    {
        std::lock_guard<std::mutex> uploaded_lock(_packed_files_mutex);
        auto it = _uploaded_packed_files.begin();
        while (it != _uploaded_packed_files.end()) {
            if (it->second->state == PackedFileState::UPLOADED &&
                current_time - it->second->upload_time > config::uploaded_file_retention_seconds) {
                it = _uploaded_packed_files.erase(it);
            } else if (it->second->state == PackedFileState::FAILED &&
                       current_time - it->second->upload_time >
                               config::uploaded_file_retention_seconds) {
                it = _uploaded_packed_files.erase(it);
            } else {
                ++it;
            }
        }
    }

    // Clean up expired global index entries
    {
        std::unordered_set<std::string> active_packed_files;
        {
            std::lock_guard<std::timed_mutex> current_lock(_current_packed_file_mutex);
            for (const auto& [resource_id, state] : _current_packed_files) {
                if (state) {
                    active_packed_files.insert(state->packed_file_path);
                }
            }
        }
        {
            std::lock_guard<std::mutex> merge_lock(_packed_files_mutex);
            for (const auto& [path, state] : _uploading_packed_files) {
                active_packed_files.insert(path);
            }
            for (const auto& [path, state] : _uploaded_packed_files) {
                active_packed_files.insert(path);
            }
        }

        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
        auto it = _global_slice_locations.begin();
        while (it != _global_slice_locations.end()) {
            const auto& index = it->second;
            if (active_packed_files.find(index.packed_file_path) == active_packed_files.end()) {
                it = _global_slice_locations.erase(it);
            } else {
                ++it;
            }
        }
    }
}

#ifdef BE_TEST
namespace {
void reset_adder(bvar::Adder<int64_t>& adder) {
    auto current = adder.get_value();
    if (current != 0) {
        adder << (-current);
    }
}
} // namespace

void PackedFileManager::reset_packed_file_bvars_for_test() const {
    reset_adder(g_packed_file_total_count);
    reset_adder(g_packed_file_total_small_file_count);
    reset_adder(g_packed_file_total_size_bytes);
    g_packed_file_small_file_num_recorder.reset();
    g_packed_file_file_size_recorder.reset();
    g_packed_file_active_to_ready_ms_recorder.reset();
    g_packed_file_ready_to_upload_ms_recorder.reset();
    g_packed_file_uploading_to_uploaded_ms_recorder.reset();
    if (auto* sampler = g_packed_file_small_file_num_recorder.get_sampler()) {
        sampler->take_sample();
    }
    if (auto* sampler = g_packed_file_file_size_recorder.get_sampler()) {
        sampler->take_sample();
    }
    if (auto* sampler = g_packed_file_active_to_ready_ms_recorder.get_sampler()) {
        sampler->take_sample();
    }
    if (auto* sampler = g_packed_file_ready_to_upload_ms_recorder.get_sampler()) {
        sampler->take_sample();
    }
    if (auto* sampler = g_packed_file_uploading_to_uploaded_ms_recorder.get_sampler()) {
        sampler->take_sample();
    }
}

int64_t PackedFileManager::packed_file_total_count_for_test() const {
    return g_packed_file_total_count.get_value();
}

int64_t PackedFileManager::packed_file_total_small_file_num_for_test() const {
    return g_packed_file_total_small_file_count.get_value();
}

int64_t PackedFileManager::packed_file_total_size_bytes_for_test() const {
    return g_packed_file_total_size_bytes.get_value();
}

double PackedFileManager::packed_file_avg_small_file_num_for_test() const {
    return g_packed_file_avg_small_file_num.get_value().get_average_double();
}

double PackedFileManager::packed_file_avg_file_size_for_test() const {
    return g_packed_file_avg_file_size_bytes.get_value().get_average_double();
}

void PackedFileManager::record_packed_file_metrics_for_test(
        const PackedFileManager::PackedFileContext* packed_file) {
    DCHECK(packed_file != nullptr);
    record_packed_file_metrics(*packed_file);
}

void PackedFileManager::clear_state_for_test() {
    std::lock_guard<std::timed_mutex> cur_lock(_current_packed_file_mutex);
    _current_packed_files.clear();
    {
        std::lock_guard<std::mutex> lock(_packed_files_mutex);
        _uploading_packed_files.clear();
        _uploaded_packed_files.clear();
    }
    {
        std::lock_guard<std::mutex> lock(_global_index_mutex);
        _global_slice_locations.clear();
    }
}
#endif

} // namespace doris::io
